package com.yzq.modular.message.service.impl;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.yzq.modular.core.CacheOperatorApi;
import com.yzq.modular.core.CommonService;
import com.yzq.modular.core.PageInfo;
import com.yzq.modular.datasync.service.DataSyncService;
import com.yzq.modular.demo.service.TestTranslationService;
import com.yzq.modular.message.entity.MessageReport;
import com.yzq.modular.message.entity.MessageReport1;
import com.yzq.modular.message.service.IMessageReport1Service;
import com.yzq.modular.message.service.IMessageReportService;
import com.yzq.modular.message.service.MessageTranslationService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * @ProjectName: modular
 * @Author: yzq
 * @Description: 新老数据转换处理服务类
 * @Date: 2022/4/18 14:03
 */
@Service
@Slf4j
public class MessageTranslationServiceImpl extends CommonService<MessageReport> implements MessageTranslationService {
   // 缓存资源
    @Resource(name="pageInfoCache")
    private CacheOperatorApi<PageInfo> pageInfoCacheOperatorApi;
    // 每次分页处理的条数
    private final static int PAGE_SIZE = 1500;
    // redis缓存名称
    private final static String TABLE_NAME = "message_report";
    @Resource
    private IMessageReportService iMessageReportService;
    @Resource
    private IMessageReport1Service iMessageReport1Service;
    @Resource
    private TransactionTemplate transactionTemplate;

    @Resource
    private DataSyncService dataSyncService;
    @Override
    public PageInfo getPageInfo() {
        // 查看缓存有没有之前同步记录，有：从之前位置继续同步，没有：从头同步
        PageInfo pageInfo =this.getCahe(TABLE_NAME);
        if(ObjectUtil.isNotEmpty(pageInfo)){
            log.info("find tableName:{} pageCache, info:总数：{}，单页条数：{}，页数：{}，当前分页：{} "
                    ,TABLE_NAME,pageInfo.getTotal(),pageInfo.getPageSize(),pageInfo.getPageNum(),pageInfo.getCurrentPageNum());
            // 判断表有没有完成数据同步：redis的当前页是不是比总数据页大
            if(pageInfo.getCurrentPageNum()>pageInfo.getPageNum()){
                //完成同步的数据，不再继续运行
                log.info("old-database 表{} is complate when {}",TABLE_NAME,pageInfo.getDate());
                return null;
            }
            return pageInfo;
        }

        // 缓存无数据，读取数据库
        int total = iMessageReportService.getTableTotal();
        int pageNum = total%PAGE_SIZE==0?total/PAGE_SIZE:total/PAGE_SIZE+1;

        pageInfo = new PageInfo(PAGE_SIZE,
                pageNum,
                1,
                total,
                TABLE_NAME,
                new Date());
        return pageInfo;
    }

    @Override
    public void setCache(PageInfo pageInfo) {
        pageInfoCacheOperatorApi.put(TABLE_NAME,pageInfo);
    }

    @Override
    public PageInfo getCahe(String tableName) {
        return pageInfoCacheOperatorApi.get(tableName);
    }

    @Override
    public void complateCache(PageInfo currentPageInfo) {
        currentPageInfo.setCurrentPageNum(currentPageInfo.getPageNum()+1);
        currentPageInfo.setDate(new Date());

        pageInfoCacheOperatorApi.put(TABLE_NAME,currentPageInfo);
    }

    @Override
    public List<MessageReport> getOldInfo(PageInfo pageInfo) {
        // 获取旧表数据
        Page<MessageReport> oldTestPage = new Page<>();
        oldTestPage.setSize(pageInfo.getPageSize());
        oldTestPage.setCurrent(pageInfo.getCurrentPageNum());
        LambdaQueryWrapper<MessageReport> lambdaQueryWrapper = new LambdaQueryWrapper<>();

        Date startTime = dataSyncService.list().get(0).getStartTime();
        LocalDateTime startLocalDateTime = DateUtil.toLocalDateTime(startTime);
        Date time = dataSyncService.list().get(0).getEndTime();
        LocalDateTime localDateTime = DateUtil.toLocalDateTime(time);
        //SimpleDateFormat df =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        //LocalDateTime localDateTime = LocalDateTime.of(2025, 1, 1, 0, 0, 0);
        lambdaQueryWrapper.isNotNull(MessageReport::getReportTime);
        lambdaQueryWrapper.lt(MessageReport::getReportTime,localDateTime);
        lambdaQueryWrapper.gt(MessageReport::getReportTime,startLocalDateTime);
        lambdaQueryWrapper.orderByAsc(MessageReport::getReportTime);
        //lambdaQueryWrapper.select(MessageReport::getId);
        Page<MessageReport> loginPage1= iMessageReportService.page(oldTestPage,lambdaQueryWrapper);

        List<MessageReport> list = new ArrayList<>();
        if(ObjectUtil.isNotEmpty(loginPage1)){
            //LambdaQueryWrapper<MessageReport> queryWrapper1 = new LambdaQueryWrapper<>();
            //queryWrapper1.in(MessageReport::getId,loginPage1.getRecords().stream().map(MessageReport::getId).collect(Collectors.toList()));
            //list = iMessageReportService.list(queryWrapper1);
            list =oldTestPage.getRecords();
        }
        return list;

    }

    @Override
    public void translate(List<MessageReport> list) {
        List<MessageReport1> newTestList = new ArrayList<>();

        /*
        *
        * 新旧表的数据转换在此处理
        *
        *
        * */
        //Date now = new Date();
        // 遍历旧表数据
        /*for(MessageReport messageReport:list){

            BeanUtil.copyToList()
        }*/
        newTestList = BeanUtil.copyToList(list,MessageReport1.class);

        List<MessageReport1> finalNewTestList = newTestList;
        transactionTemplate.execute(action->{
            //log.info("==={}", JSONObject.toJSONString(finalNewTestList));
            //iMessageReport1Service.saveBatch(finalNewTestList);
            //iMessageReport1Service.saveOrUpdateBatch(finalNewTestList);
            iMessageReport1Service.insertMessageReports(finalNewTestList);
            return new Object();
        });
        log.info("message_report 完成数据同步 id 列表：{}", JSON.toJSONString(list.stream().map(MessageReport::getId).collect(Collectors.toList())));
        PageInfo pageInfo =this.getCahe(TABLE_NAME);
        log.info("message_report 当前分页信息：{}",JSONObject.toJSONString(pageInfo));
    }

    @Override
    public void translation() {
        long startTime = System.currentTimeMillis();
        this.template();
        long endTime = System.currentTimeMillis();
        PageInfo pageInfo =this.getCahe(TABLE_NAME);
        log.info("message_report 完成数据同步 共"+pageInfo.getTotal()+"条数据，"+"共用时（ms）"+(endTime-startTime));
    }

    /**
     * 增量处理
     *
     * @author yzq
     * @date 2024/10/26
     */
    @Override
    public void incrementalProcessing(){
        // 获取旧表数据 初始分页
        Page<MessageReport> oldTestPage = new Page<>();
        oldTestPage.setSize(1500);
        oldTestPage.setCurrent(1);
        // 获取全量同步的截止时间（即增量开始时间）
        Date time = dataSyncService.list().get(0).getEndTime();
        LocalDateTime localDateTime = DateUtil.toLocalDateTime(time);
        LambdaQueryWrapper<MessageReport> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.isNotNull(MessageReport::getReportTime);
        queryWrapper.ge(MessageReport::getReportTime,localDateTime);
        // 防止脏数据超分区
        LocalDateTime localDateTime1 = LocalDateTime.of(2025, 1, 1, 0, 0, 0);
        queryWrapper.le(MessageReport::getReportTime,localDateTime1);
        queryWrapper.orderByAsc(MessageReport::getReportTime);
        // 获取总数
        Integer count = iMessageReportService.count(queryWrapper);
        if(ObjectUtil.isNotEmpty(count)){
            if(count>0){
                while(true){
                    Page<MessageReport> loginPage1= iMessageReportService.page(oldTestPage,queryWrapper);
                    List<MessageReport1> messageReport1s = BeanUtil.copyToList(loginPage1.getRecords(), MessageReport1.class);
                    // 只新增不修改
                    if(ObjectUtil.isNotEmpty(messageReport1s)){
                        List<MessageReport1> uniques = new ArrayList<>();
                        List<String> ids = messageReport1s.stream().map(MessageReport1::getId).collect(Collectors.toList());
                        List<MessageReport1> messageReport1s1 = iMessageReport1Service.listByIds(ids);
                        if(ObjectUtil.isNotEmpty(messageReport1s1)){

                            Map<String,MessageReport1> map = new HashMap<>();
                            map = messageReport1s.stream().collect(Collectors.toMap(MessageReport1::getId, Function.identity()));
                            for(MessageReport1 messageReport1: messageReport1s1){
                                if(!map.containsKey(messageReport1.getId())){
                                    uniques.add(messageReport1);
                                }
                            }
                        }else{
                            uniques = messageReport1s;
                        }
                        if(ObjectUtil.isNotEmpty(uniques) && ObjectUtil.isNotEmpty(uniques)){
                            List<MessageReport1> finalUniques = uniques;
                            transactionTemplate.execute(action->{
                                //log.info("==={}", JSONObject.toJSONString(finalNewTestList));
                                //iMessageReport1Service.saveBatch(finalNewTestList);
                                iMessageReport1Service.insertMessageReports(finalUniques);
                                return new Object();
                            });
                            oldTestPage.setCurrent(oldTestPage.getCurrent()+1);
                        }else{
                            log.info("增量同步完成，共"+count+"条数据");
                            break;
                        }
                    }

                }
            }
        }

    }
}
